{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Name\n",
    "Data preparation by using a template to submit a job to Cloud Dataflow\n",
    "\n",
    "# Labels\n",
    "GCP, Cloud Dataflow, Kubeflow, Pipeline\n",
    "\n",
    "# Summary\n",
    "A Kubeflow Pipeline component to prepare data by using a template to submit a job to Cloud Dataflow.\n",
    "\n",
    "# Details\n",
    "\n",
    "## Intended use\n",
    "Use this component when you have a pre-built Cloud Dataflow template and want to launch it as a step in a Kubeflow Pipeline.\n",
    "\n",
    "## Runtime arguments\n",
    "Argument        | Description                 | Optional   | Data type  | Accepted values | Default    |\n",
    ":---            | :----------                 | :----------| :----------| :----------     | :----------|\n",
    "project_id | The ID of the Google Cloud Platform (GCP) project to which the job belongs. | No | GCPProjectID |  |  |\n",
    "gcs_path | The path to a Cloud Storage bucket containing the job creation template. It must be a valid Cloud Storage URL beginning with 'gs://'. | No  | GCSPath  |  |  |\n",
    "launch_parameters | The parameters that are required to launch the template. The schema is defined in [LaunchTemplateParameters](https://cloud.google.com/dataflow/docs/reference/rest/v1b3/LaunchTemplateParameters). The parameter `jobName` is replaced by a generated name. | Yes  |  Dict | A JSON object which has the same structure as [LaunchTemplateParameters](https://cloud.google.com/dataflow/docs/reference/rest/v1b3/LaunchTemplateParameters) | None |\n",
    "location | The regional endpoint to which the job request is directed.| Yes  |  GCPRegion |    |  None |\n",
    "staging_dir |  The path to the Cloud Storage directory where the staging files are stored. A random subdirectory will be created under the staging directory to keep the job information. This is done so that you can resume the job in case of failure.|  Yes |  GCSPath |   |  None |\n",
    "validate_only | If True, the request is validated but not executed.   |  Yes  |  Boolean |  |  False |\n",
    "wait_interval | The number of seconds to wait between calls to get the status of the job. |  Yes  | Integer  |   |  30 |\n",
    "\n",
    "## Input data schema\n",
    "\n",
    "The input `gcs_path` must contain a valid Cloud Dataflow template. The template can be created by following the instructions in [Creating Templates](https://cloud.google.com/dataflow/docs/guides/templates/creating-templates). You can also use [Google-provided templates](https://cloud.google.com/dataflow/docs/guides/templates/provided-templates).\n",
    "\n",
    "## Output\n",
    "Name | Description\n",
    ":--- | :----------\n",
    "job_id | The id of the Cloud Dataflow job that is created.\n",
    "\n",
    "## Caution & requirements\n",
    "\n",
    "To use the component, the following requirements must be met:\n",
    "- Cloud Dataflow API is enabled.\n",
    "- The component can authenticate to GCP. Refer to [Authenticating Pipelines to GCP](https://www.kubeflow.org/docs/gke/authentication-pipelines/) for details.\n",
    "- The Kubeflow user service account is a member of:\n",
    "    - `roles/dataflow.developer` role of the project.\n",
    "    - `roles/storage.objectViewer` role of the Cloud Storage Object `gcs_path.`\n",
    "    - `roles/storage.objectCreator` role of the Cloud Storage Object `staging_dir.` \n",
    "\n",
    "## Detailed description\n",
    "You can execute the template locally by following the instructions in [Executing Templates](https://cloud.google.com/dataflow/docs/guides/templates/executing-templates). See the sample code below to learn how to execute the template.\n",
    "Follow these steps to use the component in a pipeline:\n",
    "1. Install the Kubeflow Pipeline SDK:\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%capture --no-stderr\n",
    "\n",
    "!pip3 install kfp --upgrade"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "2. Load the component using KFP SDK"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import kfp.components as comp\n",
    "\n",
    "dataflow_template_op = comp.load_component_from_url(\n",
    "    'https://raw.githubusercontent.com/kubeflow/pipelines/1.7.0-rc.2/components/gcp/dataflow/launch_template/component.yaml')\n",
    "help(dataflow_template_op)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Sample\n",
    "\n",
    "Note: The following sample code works in an IPython notebook or directly in Python code.\n",
    "In this sample, we run a Google-provided word count template from `gs://dataflow-templates/latest/Word_Count`. The template takes a text file as input and outputs word counts to a Cloud Storage bucket. Here is the sample input:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!gsutil cat gs://dataflow-samples/shakespeare/kinglear.txt"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Set sample parameters"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "tags": [
     "parameters"
    ]
   },
   "outputs": [],
   "source": [
    "# Required Parameters\n",
    "PROJECT_ID = '<Please put your project ID here>'\n",
    "GCS_WORKING_DIR = 'gs://<Please put your GCS path here>' # No ending slash"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Optional Parameters\n",
    "EXPERIMENT_NAME = 'Dataflow - Launch Template'\n",
    "OUTPUT_PATH = '{}/out/wc'.format(GCS_WORKING_DIR)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Example pipeline that uses the component"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import kfp.dsl as dsl\n",
    "import json\n",
    "@dsl.pipeline(\n",
    "    name='Dataflow launch template pipeline',\n",
    "    description='Dataflow launch template pipeline'\n",
    ")\n",
    "def pipeline(\n",
    "    project_id = PROJECT_ID, \n",
    "    gcs_path = 'gs://dataflow-templates/latest/Word_Count', \n",
    "    launch_parameters = json.dumps({\n",
    "       'parameters': {\n",
    "           'inputFile': 'gs://dataflow-samples/shakespeare/kinglear.txt',\n",
    "           'output': OUTPUT_PATH\n",
    "       }\n",
    "    }), \n",
    "    location = '',\n",
    "    validate_only = 'False', \n",
    "    staging_dir = GCS_WORKING_DIR,\n",
    "    wait_interval = 30):\n",
    "    dataflow_template_op(\n",
    "        project_id = project_id, \n",
    "        gcs_path = gcs_path, \n",
    "        launch_parameters = launch_parameters, \n",
    "        location = location, \n",
    "        validate_only = validate_only,\n",
    "        staging_dir = staging_dir,\n",
    "        wait_interval = wait_interval)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Compile the pipeline"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "pipeline_func = pipeline\n",
    "pipeline_filename = pipeline_func.__name__ + '.zip'\n",
    "import kfp.compiler as compiler\n",
    "compiler.Compiler().compile(pipeline_func, pipeline_filename)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Submit the pipeline for execution"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "#Specify pipeline argument values\n",
    "arguments = {}\n",
    "\n",
    "#Get or create an experiment and submit a pipeline run\n",
    "import kfp\n",
    "client = kfp.Client()\n",
    "experiment = client.create_experiment(EXPERIMENT_NAME)\n",
    "\n",
    "#Submit a pipeline run\n",
    "run_name = pipeline_func.__name__ + ' run'\n",
    "run_result = client.run_pipeline(experiment.id, run_name, pipeline_filename, arguments)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Inspect the output"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!gsutil cat $OUTPUT_PATH*"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## References\n",
    "\n",
    "* [Component python code](https://github.com/kubeflow/pipelines/blob/master/components/gcp/container/component_sdk/python/kfp_component/google/dataflow/_launch_template.py)\n",
    "* [Component docker file](https://github.com/kubeflow/pipelines/blob/master/components/gcp/container/Dockerfile)\n",
    "* [Sample notebook](https://github.com/kubeflow/pipelines/blob/master/components/gcp/dataflow/launch_template/sample.ipynb)\n",
    "* [Cloud Dataflow Templates overview](https://cloud.google.com/dataflow/docs/guides/templates/overview)\n",
    "\n",
    "## License\n",
    "By deploying or using this software you agree to comply with the [AI Hub Terms of Service](https://aihub.cloud.google.com/u/0/aihub-tos) and the [Google APIs Terms of Service](https://developers.google.com/terms/). To the extent of a direct conflict of terms, the AI Hub Terms of Service will control.\n"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.4"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}